Avro support#2468
Conversation
closes spring-cloud#2402 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
relates to spring-cloud#2402 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
relates to spring-cloud#2404 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
| return body != null && body.getClientValue() instanceof FromFileProperty; | ||
| } | ||
|
|
||
| private boolean isAvroContract(YamlContract contract) { |
There was a problem hiding this comment.
Can we make this section kafka / avro agnostic? If we just set asBytes() it should work right? Another option is to verify the contentType in which case if it's not json but explicitly sth else then we just pass through? Wdyt?
I also sense that we could have some extension points here. Like check through spi (we already do it in other parts of scc) some interfaces to verify if this contract has a special way of treating payload. We could have some AvroContractPayloadProceesor that would activate when the metadata has avro and it would provide a function to how convert the payload. That way this core logic stays clean of avro but we can inject the behavior. There would have to be some priority / ordering like we do in other spis here in scc.
Wdyt?
There was a problem hiding this comment.
Can wIf we just set asBytes() it should work right?
by passing bytes, do you mean the 2nd approach demo-ed here?
If so - yes, it works, but it becomes very hard to peer review the contract changes and understand test failures.
In fact, if we stick to that approach, we won't need any of this - as it mainly covers workarounds needed for the 1st apporach described in the samples PR.
Another option is to verify the contentType in which case if it's not json but explicitly sth else then we just pass through? Wdyt?
Are you suggesting setting a header like contentType: application/avro and relying on it here, instead of the new custom field we added to the contract metadata?
It would force users to add the contentType header to the contract even if they don't use it in prod (as it is not strictly required, more like a convention some are using).
This would work - but imo sounds a bit sketchy. wdyt?
What's your concern about using the contract meta here?
I also sense that we could have some extension points here
Yes, that's a great idea, actually. We have a list of processors, each of them will provide a predicate to test the contract (therefore, we see if we should apply it) , and a function for mapping/extracting the payload.
This way, we can keep using the metadata approach, but extract it somewhere else and keep this agnostic of avro.
+You mentioned potentially adding prtobuff later, which might use this extension point too.
| outputMessage: | ||
| sentTo: book.returned | ||
| headers: | ||
| X-Correlation-Id: abc-123-def |
There was a problem hiding this comment.
What headers are being sent in case of avro messages? Is there any content type?
There was a problem hiding this comment.
afaik, there are no headers strictly required by the kafka broker or the Avro/confluent deserializer — the schema info is embedded in the payload. contentType is sometimes added by convention but isn't enforced by anything.
This X-Correlation-Id header in the test is just a custom header used to verify that user-defined headers are propagated correctly. Would you prefer to rename it to something more obviously test-scoped (e.g. X-Dummy-Custom-Header), or add a contentType: application/avro ?
|
|
||
| private File saveTmpContract(String contractYaml) { | ||
| File contractDir = File.createTempDir() | ||
| new File(contractDir, "book_returned.yml").text = contractYaml |
|
|
||
| @Bean | ||
| @ConditionalOnMissingBean(name = "avroKafkaTemplate") | ||
| KafkaTemplate<String, Object> avroKafkaTemplate(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, |
There was a problem hiding this comment.
Why can't we reuse the users production template?
There was a problem hiding this comment.
I was thinking about it: this is meant to be used on the consumer side - therefore we can have a kafkaTemplate configured differently here (eg: seems unlikely, but we can have an app consuming avro and publishing json)
What do you think about looking for a bean named avroKafkaTemplate first, if not, fall back to the "prod" kafkaTempalte bean, if not present either, create one.
if (ctx.containsBean("avroKafkaTemplate")) {
return (KafkaTemplate<String, Object>) ctx.getBean("avroKafkaTemplate");
}
if (ctx.containsBean("kafkaTemplate")) {
return (KafkaTemplate<String, Object>) ctx.getBean("kafkaTemplate");
}
return createAvroKafkaTemplate(bootstrapServers, schemaRegistryUrl);My main point is to allow overriding this avroKafkaTemplate used by the stub without altering the prod bean.
What do you think about smth like this?
| } | ||
|
|
||
| @JsonIgnoreProperties({ "schema", "specificData", "classSchema", "conversion" }) | ||
| interface IgnoreAvroMixin { |
There was a problem hiding this comment.
Can you provide a javadoc why we need this?
relates to spring-cloud#2404 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
relates to spring-cloud#2404 Signed-off-by: Emanuel Trandafir <emanueltrandafir1993@gmail.com>
Bug fix ContractVerifierObjectMapper fails for Avro-generated objects #2402 —
ContractVerifierObjectMapperfails for Avro objects: when the intercepted message is an Avro-generated object,ContractVerifierObjectMapperwould fail to serialize it to JSON due to Avro-specific fields (schema,specificData,classSchema,conversion). Fixed by configuring theJsonMapperto ignore those fields via a mixin when Avro is on the classpath.Avro support for contract-based messaging: introduced
KafkaAvroMessageVerifierSenderwhich builds an AvroGenericRecordfrom the contract body and sends it viaKafkaTemplate, backed byKafkaAvroContractVerifierConfiguration(auto-configuration) and
AvroMetadata(schema config in contract metadata).Header propagation:
KafkaAvroMessageVerifierSendernow wraps the payload in aProducerRecordand copies contract output headers (e.g.X-Correlation-Id) as UTF-8 bytes onto it, so they are actually sent to Kafka.Bug fix StubRunnerExecutor fails for Avro objects #2404 —
StubRunnerExecutorJSON-serializes Avro body:StubRunnerExecutor.sendMessage()was unconditionally callingJsonOutput.toJson()on the message body before passing it to theMessageVerifierSender. This brokeKafkaAvroMessageVerifierSenderwhich expects aMap, not a JSON string. Fixed by adding anisAvroContract()check that inspects the raw contract metadata and skips JSON serialization for Avro contracts, passing the body as aMapdirectly.
TODO: if this gets accepted, we also need to add the documentation and sthe samples repo
Related issues